/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client.factory;

import com.sankuai.xm.kafka.client.IConsumerProcessor;
import com.sankuai.xm.kafka.client.IMessageListener;
import com.sankuai.xm.kafka.client.exception.ConsumerRuntimeException;
import com.sankuai.xm.kafka.client.utils.NamedThreadFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultConsumerProcessor implements IConsumerProcessor {
	private static final Logger log = LoggerFactory
			.getLogger(DefaultConsumerProcessor.class);
	private String topic;
	private int numThreads;
	private ConsumerConnector consumer;
	private ExecutorService executor;

	public DefaultConsumerProcessor(ConsumerConnector consumer, String topic,
			int numThreads) {
		this.consumer = consumer;
		this.topic = topic;
		this.numThreads = numThreads;
	}

	public void recvMessageWithParallel(Class clz, IMessageListener iMessageListener)
  {
    if (this.numThreads <= 0) {
      log.error("xm-kafka-client, consumer.numThreads is not illegality, topic : {}", this.topic);
      throw new ConsumerRuntimeException("xm-kafka-client, consumer.numThreads is not illegality");
    }

    Map topicCountMap = new HashMap();
    topicCountMap.put(this.topic, new Integer(this.numThreads));
    Map consumerMap = this.consumer.createMessageStreams(topicCountMap);
    List<KafkaStream> streams = (List)consumerMap.get(this.topic);
    this.executor = createConsumerExecutor(this.numThreads);
    for (KafkaStream stream : streams) {
      this.executor.submit(new ConsumerMsgTask(stream, iMessageListener));
    }
    log.info("xm-kafka-client, consumer.recv start, topic : {}", this.topic);
  }

	public void close() throws Exception {
		this.executor.shutdown();
		this.consumer.shutdown();
	}

	private ExecutorService createConsumerExecutor(int numThreads) {
		this.executor = new ThreadPoolExecutor(numThreads, numThreads, 60L,
				TimeUnit.SECONDS, new LinkedBlockingQueue(100000),
				new NamedThreadFactory("XM-KAFKA-CLIENT-CONSUMER-EXECUTOR",
						true), new ThreadPoolExecutor.AbortPolicy() {
					private Logger inlog;

					public void rejectedExecution(Runnable r,
							ThreadPoolExecutor e) {
						String msg = String
								.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s) !",
										new Object[] {
												"XM-KAFKA-CLIENT-CONSUMER-ABORD-REJECTD-PROCESSOR",
												Integer.valueOf(e.getPoolSize()),
												Integer.valueOf(e
														.getActiveCount()),
												Integer.valueOf(e
														.getCorePoolSize()),
												Integer.valueOf(e
														.getMaximumPoolSize()),
												Integer.valueOf(e
														.getLargestPoolSize()),
												Long.valueOf(e.getTaskCount()),
												Long.valueOf(e
														.getCompletedTaskCount()),
												Boolean.valueOf(e.isShutdown()),
												Boolean.valueOf(e
														.isTerminated()),
												Boolean.valueOf(e
														.isTerminating()) });

						this.inlog.warn(msg);
						throw new RejectedExecutionException(msg);
					}
				});
		return this.executor;
	}
}